<?php
require_once __DIR__.'/autoload.php';

use common\BaseWorker;
use common\Base;
use Workerman\Connection\AsyncUdpConnection;
use \Workerman\Connection\AsyncTcpConnection;
use Workerman\Lib\Timer;
use common\Log;
use common\Node;

$server = new BaseWorker();

// $server->reusePort = true;

$server->protocol = new \Protocols\DHT();

$server->bootstrap_nodes = array(
    ['47.88.58.78',6666],
    array('router.bittorrent.com', 6881),
    array('dht.transmissionbt.com', 6881),
    array('router.utorrent.com', 6881),
);

$server->conn_action = [];

$server->table = [];

$server->nid = Base::get_node_id();

$server->selfNodeInfo = new Node($server->nid,'47.88.58.78',4788);

$server->config = require_once __DIR__.'/config/config.php';

$server->onMessage = function($connection,$data) use ($server){

    // Log::record('revice data from '.$connection->getRemoteAddress()." in local info: ".$connection->getLocalAddress());
    if(!$data){
        return ;
    }

    switch ($data['y']) {
        case 'q':
            switch ($data['q']) {
                case 'ping':
                    Log::record('ping query');
                    $server->onPing($connection,$data);
                    break;
                case 'find_node':
                    Log::record('find node query');
                    $server->onFindNode($connection,$data);
                    break;
                case 'get_peers':
                    Log::record('get peers query');
                    $server->onGetPeers($connection,$data);
                    break;
                case 'announce_peer':
                    Log::record('announce query');
                    $server->onAnnouncePeer($connection,$data);
                    break;
                default:
                    # code...
                    break;
            }
            break;
        case 'r':
            if(isset($server->conn_action[$data['t']])){
                $connection->action = $server->conn_action[$data['t']];
                unset($server->conn_action[$data['t']]);

                switch ($connection->action) {
                    case 'ping':
                        Log::record('ping response');
                        $server->onResponsePing($connection,$data);
                        break;
                    case 'find_node':
                        Log::record('find node response');
                        $server->onResponseFindNode($connection,$data);
                        break;
                    case 'get_peers':
                        Log::record('get peers response');
                        $server->onResponseGetPeers($connection,$data);
                        break;
                    case 'announce_peer':
                        Log::record('announce response');
                        $server->onResponseAnnouncePeer($connection,$data);
                        break;
                    default:
                        # code...
                        break;
                }
            }
            break;
        case 'e':
            Log::record('revice error','error');
            log::record('error data:'.PHP_EOL.\json_encode($data,JSON_PRETTY_PRINT));
            break;
        default:
            # code...
            break;
    }
    // $connection->close();

};



$server->onPing = function($conn,$data) use ($server){
    $msg['t'] = $data['t'];
    $msg['y'] = 'r';
    $msg['r'] = [
        'id'=>$server->nid
    ];
    $conn->send($msg);
    
};

$server->onFindNode = function($conn,$data) use ($server){
    $table_conn = new AsyncTcpConnection('frame://127.0.0.1:226');
    $table_conn->onConnect = function($table_conn) use($data){
        $table_msg['action'] = 'getNodes';
        $table_msg['data']['target'] = $data['a']['target'];
        $table_conn->send(serialize($table_msg));
    };
    $table_conn->onMessage = function($table_conn,$nodes_data) use($data,$server,$conn){
        $nodes_data = unserialize($nodes_data);
        // $nodes_data[] = $server->selfNodeInfo;
        if(isset($nodes_data['error'])){
            $msg['t'] = $data['t'];
            $msg['y'] = 'e';
            $msg['e'] = [202,$nodes_data['error']];
            $conn->send($msg);
            Log::record("get nodes error :".$nodes_data['error']);
            $server->joinDht();
        }else{
            // echo count($nodes_data);
            $msg['t'] = $data['t'];
            $msg['y'] = 'r';
            $msg['r'] = [
                'id'=>$server->nid,
                'nodes'=>Base::encode_nodes($nodes_data) 
            ];
            $conn->send($msg);
            // Log::record("get nodes status :ok");
        }
    };
    $table_conn->connect();
};


$server->onGetPeers = function($conn,$data) use ($server){
    
    $infohash = $data['a']['info_hash'];

    $conn_msg['t'] = $data['t'];
    $conn_msg['y'] = 'r';
    $conn_msg['r'] = [
        'id'=>$server->nid,
        'token'=>substr($infohash,0,2),
    ];

    $table_conn = new AsyncTcpConnection('frame://127.0.0.1:226');

    $table_conn->onConnect = function($conn) use ($infohash){
        $table_msg['action'] = 'getPeers';
        $table_msg['data']['infohash'] = $infohash;
        $conn->action = 'getPeers';
        $conn->send(serialize($table_msg));
    };

    $table_conn->onMessage = function($table_conn,$data) use ($conn,$conn_msg,$server,$infohash){
        $data = unserialize($data);
        if($table_conn->action == 'getPeers'){
            if(isset($data['error'])){

                $table_msg['action'] = 'getNodes';

                $table_msg['data']['target'] = $infohash;

                $conn->send($table_msg);

                $conn->onMessage = function($table_conn,$data) use ($conn,$conn_msg){
                    $data = unserialize($data);
                    if(isset($data['error'])){
                        $msg['t'] = $data['t'];
                        $msg['y'] = 'e';
                        $msg['e'] = [202,$nodes_data['error']];
                        $conn->send($msg);
                    }else{
                        $conn_msg['r']['nodes'] = Base::encode_nodes($data);
                        $conn->send($conn_msg);
                    }
                };

                $server->getPeers($infohash);


            }else{

                $conn_msg['r']['value'] = Base::encode_peers($data);

                $conn->send($conn_msg);
            }
        }
        
    };

    $table_conn->connect();

    $infohash = bin2hex($infohash);

    Log::record('find hash:'.$infohash);

    // $server->onFindNode($conn,$data);
};

$server->onResponseFindNode = function($conn,$data) use ($server){
    
    $node_info_list = Base::decode_nodes($data['r']['nodes']);
    foreach ($node_info_list as  $node_info) {
        $server->addNode($node_info);
    }
    
};

$server->onResponseGetPeers = function($conn,$data){
    Log::record('onResponseGetPeers called\n');
};

$server->addNode = function($node_info){
    $table_msg['action'] = 'addNode';
    $table_msg['data'] = $node_info;
    $table_conn = new AsyncTcpConnection('frame://127.0.0.1:226');
    $table_conn->onConnect = function($conn) use ($table_msg){
        $conn->send(serialize($table_msg));
    };
    $table_conn->onMessage = function($conn,$data){
        $data = unserialize($data);
        // Log::record('add node status:'.$data['status'] );
    };
    $table_conn->connect();
};

$server->onWorkerStart = function() use ($server){

    $monitor = new BaseWorker('udp://0.0.0.0:4788');
    $monitor->protocol = $server->protocol;
    $monitor->nid = $server->nid;
    $monitor->bootstrap_nodes = $server->bootstrap_nodes;
    $monitor->onPing = $server->onPing;
    $monitor->onFindNode = $server->onFindNode;
    $monitor->onGetPeers = $server->onGetPeers;
    // $monitor->onAnnouncePeer = $server->onAnnouncePeer;
    // $monitor->onResponsePing = $server->onResponsePing;
    $monitor->onResponseFindNode = $server->onResponseFindNode;
    $monitor->onResponseGetPeers = $server->onResponseGetPeers;
    // $monitor->onResponseAnnouncePeer = $server->onResponseAnnouncePeer;
    $monitor->onMessage = $server->onMessage;


    $monitor->joinDht = function() use ($monitor){
        Log::record('join dht start');
        // if(count($server->table) == 0){
            foreach($monitor->bootstrap_nodes as $node){
                //echo '路由表为空 将自身伪造的ID 加入预定义的DHT网络 '.$node[0].PHP_EOL;
                
                $monitor->findNode("udp://".gethostbyname($node[0]).":".$node[1]); //将自身伪造的ID 加入预定义的DHT网络
            }
        // }
        // $server->listen();
    };

    $monitor->findNode = function($address,$id = null) use ($monitor,$server){
        // Log::record('find node called');
        if(is_null($id)){
            $mid = Base::get_node_id();
        }else{
            $mid = Base::get_neighbor($id, $nid); // 否则伪造一个相邻id
        }
        $msg = array(
            't' => Base::entropy(2),
            'y' => 'q',
            'q' => 'find_node',
            'a' => array(
                'id' => $server->nid,
                'target' => $mid
            )
        );

        $monitor->sendTo($address,$msg,'find_node');
    };

    $monitor->getPeers = function($infohash) use ($monitor,$server){
        Log::record("find infohash peers\n");
        $table_conn = new AsyncTcpConnection('frame://127.0.0.1');
        $table_conn->onConnect = function($conn){
            $table_msg['action'] = 'getNodes';
            $table_msg['data']['target'] = $infohash;
            $conn->send(serialize($table_msg));
        };
        $table_conn->onMessage = function($conn,$data) use($monitor,$server,$infohash){
            $data = unserialize($data);
            if(!isset($data['error'])){
                foreach ($data as $key => $value) {
                    $get_msg['t'] = Base::entropy(2);
                    $get_msg['y'] = 'q';
                    $get_msg['q'] = 'get_peers';
                    $get_msg['a'] = [
                        'id' =>$server->nid,
                        'info_hash'=>$infohash
                    ];
                    $monitor->sendTo('udp://'.$value->ip.':'.$value->port,$get_msg,'get_peers');
                }
            }
        };
    };

    $server->getPeers = $monitor->getPeers;


    $monitor->sendTo = function($address,$msg,$action = 'ping')  use ($monitor,$server){

        echo "send to $address\n";

        $server->conn_action[$msg['t']] = $action;

        // $sender = new BaseWorker($address);
        // $sender->reusePort = true;
        // $sender->protocol = $server->protocol;
        // $sender->onConnect = function($conn) use ($msg,$server,$sender,$action){
        //     $conn->action = $action;
        //     $conn->send($msg);
        //     $sender->close();
        //     $sender->stop();
        // };
        // $sender->listen();

        $monitor->unlisten();

        
        // foreach ($server->connections as $connection) {
        //     $connection->close();
        // }

        // sleep(1);

        // connect to the internet using port '4788'
        $opts = array(
            'socket' => array(
                'bindto' => '0:4788',
            ),
        );

        $find_conn = new AsyncUdpConnection($address,$opts);

        $find_conn->worker = $monitor;

        $find_conn->action = $action;

        $find_conn->protocol = $monitor->protocol;

        // $find_conn->onConnect = function($connection) use ($msg,$server){
            // $connection->close($msg);
            // $server->listen();
        // };

        // $find_conn->connect();

        $find_conn->close($msg);

        $monitor->listen();
        
    };

    $monitor->toNode = function($conn){
        return new Node($conn->nid,$conn->getRemoteIp(),$conn->getRemotePort());
    };

    // $server->unlisten();

    Log::record('server start');
    $monitor->joinDht();

    // 每2.5秒执行一次
    $time_interval = 20;
    Timer::add($time_interval, function() use($monitor)
    {
        // Log::record('timing find node');
        $table_msg['action'] = 'getNodes';
        $table_msg['data']['target'] = Base::get_node_id();
        $table_conn = new AsyncTcpConnection('frame://127.0.0.1:226');
        $table_conn->onConnect = function($conn) use ($table_msg){
            $conn->send(serialize($table_msg));
        };
        $table_conn->onMessage = function($conn,$data) use($monitor){
            $data = unserialize($data);
            if(isset($data['error'])){
                // Log::record('get node status:'.$data['error'] );
                $monitor->joinDht();
            }else{
                foreach ($data as $node_key => $node_value) {

                    $monitor->findNode("udp://$node_value->ip:$node_value->port");
                }
            }
        };
        $table_conn->onError = function($table_conn,$code,$msg){
            // Log::record('connect to table error,error code:'.$code.":".$msg);
        };
        $table_conn->connect();
    });

    
};



BaseWorker::runAll();